實作 Day23 的簡單的 gRPC 應用,來嘗試雙向串流
透過官方的範例,實作 golang 的 gRPC 中的雙向串流。
撰寫協定
在協定用 stream 來代表串流物件,設定 RouteChat 的 rpc 方法中,雙向都傳送串流物件
syntax = "proto3";
service Portal {
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
}
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
message RouteNote {
Point location = 1;
string message = 2;
}
服務端
package main
import (
"fmt"
"io"
"log"
"net"
"sync"
pb "github.com/cyan92128505/cyangrpc/pb"
"google.golang.org/grpc"
)
type PortalService struct {
mu sync.Mutex
routeNotes map[string][]*pb.RouteNote
}
func (s *PortalService) RouteChat(stream pb.Portal_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
s.mu.Lock()
s.routeNotes[key] = append(s.routeNotes[key], in)
rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
copy(rn, s.routeNotes[key])
s.mu.Unlock()
for _, note := range rn {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
func serialize(point *pb.Point) string {
return fmt.Sprintf("%d %d", point.Latitude, point.Longitude)
}
func main() {
listenPort, err := net.Listen("tcp", ":19003")
if err != nil {
log.Fatalln(err)
}
server := grpc.NewServer()
portalService := &PortalService{routeNotes: make(map[string][]*pb.RouteNote)}
pb.RegisterPortalServer(server, portalService)
log.Println("server on")
server.Serve(listenPort)
}
用戶端
package main
import (
"context"
"io"
"log"
"time"
pb "github.com/cyan92128505/cyangrpcclient/pb"
"google.golang.org/grpc"
)
func runRouteChat(client pb.PortalClient) {
notes := []*pb.RouteNote{
{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "First message"},
{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Second message"},
{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Third message"},
{Location: &pb.Point{Latitude: 0, Longitude: 1}, Message: "Fourth message"},
{Location: &pb.Point{Latitude: 0, Longitude: 2}, Message: "Fifth message"},
{Location: &pb.Point{Latitude: 0, Longitude: 3}, Message: "Sixth message"},
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := client.RouteChat(ctx)
if err != nil {
log.Fatalf("%v.RouteChat(_) = _, %v", client, err)
}
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
}
func main() {
conn, err := grpc.Dial("127.0.0.1:19003", grpc.WithInsecure())
if err != nil {
log.Fatal("client connection error:", err)
}
defer conn.Close()
client := pb.NewPortalClient(conn)
runRouteChat(client)
}
服務端和用戶端,都是使用Recv
方法和Send
方法來做接收和傳送,並等待io.EOF
作為串流的結束,
對微服務架構來說,雙向串流是方便的傳輸方式。